Livy 에 대해서
- spark cluster 에 REST API 로 상호작용하는 프로젝트
- 글 작성 시점에선 apache incubator 프로젝트
Livy 로 spark submit 하기
POST /batches
API 로 spark 앱을 submit 할 수 있다.
file
: main 파일 경로 (required)
className
: 실행할 class name (jar 로 submit 할시 required)
queue
: YARN queue name
pyFiles
: pyspark 앱일 경우 .py
파일들
files
: config 등 사용될 파일들
args
: main 에 전달할 sys.argv
인자
HDFS 에 python 코드 업로드하고 livy 로 submit하기
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder \
.appName("ZeppelinEquivalentJob") \
.enableHiveSupport() \
.getOrCreate()
spark.sql('USE dw')
df = spark.table("user_profile")
df2 = df.select(col('user_id'), col('nickname').alias('name'))
df2.write.mode("append").saveAsTable("feature_store.demo_user_table")
- hdfs 에 webhdfs API 로 python 코드 업로드
import requests
from requests.auth import HTTPBasicAuth
KNOX_URL = "https://<host>:<port>/gateway/default/webhdfs/v1"
AUTH = HTTPBasicAuth("<id>", "<password>")
HDFS_PATH = "/tmp/anjinwoong/etl_job_test.py"
r1 = requests.put(
f"{KNOX_URL}{HDFS_PATH}",
params={'op': 'CREATE', 'overwrite': 'true', 'user.name': 'jungahn'},
allow_redirects=False,
auth=AUTH,
verify=False
)
assert r1.status_code == 307, f"CREATE 단계 실패: {r1.text}"
with open("src/etl_job_test.py", "rb") as f:
r2 = requests.put(r1.headers['Location'], data=f, auth=AUTH, verify=False)
print(r2.status_code)
assert r2.status_code == 201, f"업로드 실패: {r2.text}"
print("HDFS 업로드 완료")
POST /batches
API 로 pyspark 실행
- log 조회의 경우 직후에 조회시 accpeted 되기 전 혹은 직후일 가능성이 높으므로, 따로 분리하여 id 로 조회 권장
import requests
import json
from requests.auth import HTTPBasicAuth
livy_url = "https://<host>:<port>/gateway/default/livy/v1/batches"
auth = HTTPBasicAuth("<id>", "<password>")
payload = {
"file": "hdfs:///tmp/anjinwoong/etl_job_test.py",
"name": "pyspark-hdfs-job",
"proxyUser": "<id>",
"executorCores": 2,
"executorMemory": "1g",
"numExecutors": 2,
"driverMemory": "1g",
"conf": {
"spark.yarn.submit.waitAppCompletion": "true",
"spark.pyspark.python": "/usr/bin/python3",
"spark.yarn.queue": "default",
}
}
headers = {
"Content-Type": "application/json"
}
response = requests.post(
livy_url,
auth=auth,
headers=headers,
data=json.dumps(payload),
verify=False
)
print("Status:", response.status_code)
print("Response:", response.json())
batch_id = response.json()["id"]
log_resp = requests.get(
f"{livy_url}/{batch_id}/log?from=0&size=1000",
auth=auth,
verify=False
)
for line in log_resp.json().get("log", []):
print(line)
reference