Shuffling시 Skew 처리 방식과 Spark ML에 대한 파트
spark.sparkContext.broadcast를 사용
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
def my_func(code: str) -> str:
# return prdCode.get(code)
return bdData.value.get(code)
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Demo") \
.master("local[3]") \
.getOrCreate()
prdCode = spark.read.csv("data/lookup.csv").rdd.collectAsMap()
bdData = spark.sparkContext.broadcast(prdCode)
data_list = [("98312", "2021-01-01", "1200", "01"),
("01056", "2021-01-02", "2345", "01"),
("98312", "2021-02-03", "1200", "02"),
("01056", "2021-02-04", "2345", "02"),
("02845", "2021-02-05", "9812", "02")]
df = spark.createDataFrame(data_list) \
.toDF("code", "order_date", "price", "qty")
spark.udf.register("my_udf", my_func, StringType())
df.withColumn("Product", expr("my_udf(code)")) \
.show()
PySpark를 사용하기 위해 세션을 생성함. 이 세션은 로컬 모드에서 동작하며, 'PySpark Accumulator'라는 애플리케이션 이름을 가짐.
from pyspark.sql import SparkSession
spark = SparkSession.builder\
.master("local[*]")\
.appName('PySpark Accumulator')\
.getOrCreate()
California 주의 도시와 zipcode 정보를 담은 데이터 리스트를 생성하고, 이를 DataFrame으로 변환함.
data_list = [
("California", "Sunnyvale", 9511),
("California", "Mountain View", 94111),
("California", "Cupertino", 94123),
("California", "San Jose", 951)
]
df = spark.createDataFrame(data_list) \
.toDF("state", "city", "zipcode")
df.show()
출력:
+----------+-------------+-------+
| state| city|zipcode|
+----------+-------------+-------+
|California| Sunnyvale| 9511|
|California|Mountain View| 94111|
|California| Cupertino| 94123|
|California| San Jose| 951|
+----------+-------------+-------+
잘못된 zipcode의 개수를 세기 위해 accumulator를 생성함.
bad_zipcodes = spark.sparkContext.accumulator(0)
잘못된 zipcode를 처리하는 함수를 정의하고 UDF로 등록함. 이 함수는 zipcode가 5자리인지 확인하고, 그렇지 않으면 accumulator의 값을 1 증가시킴.
def handle_bad_zipcode(c: int) -> int:
if len(str(c)) != 5:
bad_zipcodes.add(1)
return None
return c
spark.udf.register("handle_bad_zipcode", handle_bad_zipcode, IntegerType())
UDF를 사용하여 zipcode를 검사하고, 잘못된 경우 null로 대체한 새로운 컬럼을 추가함.
df.withColumn("corrected_zipcode", expr("handle_bad_zipcode(zipcode)")) \
.show()
출력:
+----------+-------------+-------+-----------------+
| state| city|zipcode|corrected_zipcode|
+----------+-------------+-------+-----------------+
|California| Sunnyvale| 9511| null|
|California|Mountain View| 94111| 94111|
|California| Cupertino| 94123| 94123|
|California| San Jose| 951| null|
+----------+-------------+-------+-----------------+
accumulator의 값을 출력하여 잘못된 zipcode의 개수를 확인함.
print("Bad Record Count:" + str(bad_zipcodes.value))
출력:
Bad Record Count: 2
zipcode 컬럼을 수정하고, 잘못된 zipcode를 null로 대체한 DataFrame을 생성함.
df.withColumn("corrected_zipcode", expr("handle_bad_zipcode(zipcode)")). \
select("state", "city", "corrected_zipcode"). \
withColumnRenamed("corrected_zipcode", "zipcode").show()
출력:
+----------+-------------+-------+
| state| city|zipcode|
+----------+-------------+-------+
|California| Sunnyvale| null|
|California|Mountain View| 94111|
|California| Cupertino| 94123|
|California| San Jose| null|
+----------+-------------+-------+
DataFrame의 각 값을 accumulator에 더하는 예제를 수행함.
data = [1, 2, 3, 4, 5]
df_test = spark.createDataFrame(data, "int").toDF("value")
accumulator = spark.sparkContext.accumulator(0)
def add_to_accumulator(row):
global accumulator
accumulator += row["value"]
df_test.foreach(add_to_accumulator)
print("Accumulator value: ", accumulator.value)
출력:
Accumulator value: 15
DataFrame의 각 행을 검사하여 잘못된 zipcode를 찾는 예제를 수행함.
accumulator_zipcode = spark.sparkContext.accumulator(0)
def find_wrong_zipcode(row):
global accumulator_zipcode
accumulator_zipcode += 1 if len(str(row["zipcode"])) != 5 else 0
df.foreach(find_wrong_zipcode)
print("Wrong zipcode: ", accumulator_zipcode.value)
출력:
Wrong zipcode: 2

| 환경 변수 이름 | Default 값 | LinkedIn 설정 값 |
|---|---|---|
| spark.speculation.interval | 100ms | 1 sec |
| spark.speculation.multiplier | 1.5 | 4 |
| spark.speculation.quantile | 0.75 | 0.9 |
| spark.speculation.minTaskRuntime | 100ms | 30 sec |
| spark.speculation.task.duration.threshold | None | None |
spark.speculation.intervalspark.speculation.multiplierspark.speculation.quantilespark.speculation.minTaskRuntimespark.speculation.task.duration.threshold크게 2개의 관점이 있음.

Dynamic Resource Allocation은 스파크 클러스터에서 실행 중인 애플리케이션의 자원을 동적으로 조절하여 효율성을 극대화하는 기능임. 아래 환경 변수들로 제어됨.
spark.dynamicAllocation.enabledtrue 또는 falsetrue로 설정하면 Dynamic Resource Allocation이 활성화되어 실행 중인 애플리케이션이 필요에 따라 자원을 동적으로 조절함.spark.dynamicAllocation.shuffleTracking.enabledtrue 또는 falsetrue로 설정하면 Dynamic Resource Allocation이 shuffle 데이터를 추적하고, shuffle 데이터가 더 이상 필요하지 않으면 해당 executors를 해제함.spark.dynamicAllocation.executorIdleTimeoutspark.dynamicAllocation.schedulerBacklogTimeoutspark.dynamicAllocation.minExecutorsspark.dynamicAllocation.maxExecutorsspark.dynamicAllocation.initialExecutorsspark.dynamicAllocation.executorAllocationRatio
Spark.schheduler.mode : FIFO(default) 혹은 FAIRspark.scheduler.allocation.file : "FAIR"인 경우 필요하며, 풀을 정의해놓은 형태로 사용됨.Driver는 스파크 애플리케이션의 중심 역할을 하며, 스파크 클러스터와 상호작용하고 애플리케이션 실행을 관리함.
collect, save, count)을 처리하여 최종 데이터를 제공함.Executor는 스파크 애플리케이션에서 실제 작업을 수행하는 구성 요소로, 주로 다음과 같은 역할을 함:
Executor는 스파크 애플리케이션에서 실제 데이터 처리를 담당하는 구성 요소임. 다음과 같은 역할을 수행함:
이러한 역할을 통해 Executor는 스파크 클러스터에서 대규모 데이터를 분산 처리하고, 높은 성능을 유지함.

Spark 세션을 생성하여 로컬 모드에서 3개의 코어를 사용하도록 설정함.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("Spark Driver OOM Demo") \
.getOrCreate()
작은 범위의 데이터를 생성하고 이를 수집하여 정상 작동을 확인함.
df = spark.range(1000)
d = df.collect()
대규모 데이터를 생성하고 이를 수집하려고 시도함. 이로 인해 메모리 부족 문제가 발생함.
df = spark.range(100000000)
d = df.collect()
오류 메시지 (Out of Memory 부분):
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.base/java.util.Arrays.copyOf(Arrays.java:3745)
at java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:120)
at java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
at java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
at net.jpountz.lz4.LZ4BlockOutputStream.flushBufferedData(LZ4BlockOutputStream.java:225)
at net.jpountz.lz4.LZ4BlockOutputStream.write(LZ4BlockOutputStream.java:178)
at java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.spark.sql.catalyst.expressions.UnsafeRow.writeToStream(UnsafeRow.java:542)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:367)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
...
collect() 연산은 모든 데이터를 드라이버 메모리로 가져오므로 대규모 데이터셋의 경우 메모리 부족 문제가 발생할 수 있음.메모리 증가: 드라이버와 실행기의 메모리를 증가시켜 문제를 해결할 수 있음.
spark = SparkSession \
.builder \
.master("local[3]") \
.appName("Spark Driver OOM Demo") \
.config("spark.driver.memory", "4g") \
.config("spark.executor.memory", "4g") \
.getOrCreate()
데이터 처리를 분산시킴: collect() 대신 다른 방식으로 데이터를 처리하여 메모리 부담을 줄임.
df = spark.range(100000000)
df.write.csv("/path/to/output")
필요한 데이터만 수집: 필요한 데이터만 필터링하여 수집함.
filtered_df = df.filter(df.id < 1000)
d = filtered_df.collect()
이를 통해 메모리 부족 문제를 해결하고 대규모 데이터셋도 효율적으로 처리할 수 있음.