고수준 API에서 제공하지 않는 기능이 필요한 경우
스파크의 모든 워크로드는 저수준 기능을 사용하는 기초적인 형태로 컴파일되므로 이를 이해하는 것은 많은 도움이 될 수 있음
spark.sparkContext
Out[1]:
SparkContext
Spark UI
Version v3.1.0
Master local[8]
AppName Databricks Shell
불변성을 가지며 병렬로 처리할 수 있는 파티셔닝된 레코드의 모음
RDD의 레코드는 프로그래머가 선택하는 자바, 스칼라, 파이썬의 객체
이러한 객체에는 사용자가 원하는 포맷을 사용해 원하는 모든 데이터를 저장할 수 있음
모든 값을 다루거나 값 사이의 상호작용 과정은 반드시 수동으로 정의
spark에서는 RDD레코드의 내부 구조를 파악할 수 없으므로 수작업으로 최적화
두 가지 타입의 RDD를 만들 수 있음
RDD의 주요 속성
이러한 속성은 사용자 프로그램을 스케줄링하고 실행하는 스파크의 모든 처리 방식을 결정
또한 RDD 역시 트랜스포메이션, 액션 제공
언어별 성능 차이
spark.range(10).rdd
Out[2]: MapPartitionsRDD[5] at javaToPython at NativeMethodAccessorImpl.java:0
#위에서 만들어진 데이터를 처리하려면 Row 객체를 올바른 데이터 타입으로 변환하거나 Row 객체에서 값을 추출해야함
spark.range(10).toDF('id').rdd.map(lambda row: row[0])
Out[3]: PythonRDD[12] at RDD at PythonRDD.scala:58
#RDD -> DataFrame
spark.range(10).rdd.toDF()
Out[4]: DataFrame[id: bigint]
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple".split(' ')
words = spark.sparkContext.parallelize(myCollection, 2) #파티션 수: 2
#RDD에 이름을 지정하면 스파크 UI에 지정한 이름으로 RDD가 표시됨
words.setName('myWords')
words.name()
Out[6]: 'myWords'
#줄 단위로 텍스트 파일을 RDD로 읽음
spark.sparkContext.textFile('/FileStore/tables/withTextFiles')
Out[7]: /FileStore/tables/withTextFiles MapPartitionsRDD[26] at textFile at NativeMethodAccessorImpl.java:0
#여러 텍스트 파일의 각 줄을 레코드로 가진 RDD를 생성함
spark.sparkContext.wholeTextFiles('/FileStore/tables/withTextFiles')
Out[8]: org.apache.spark.api.java.JavaPairRDD@613b96a0
#distinct
words.distinct().count()
Out[9]: 10
#조건 함수
def startsWithS(individual):
return individual.startswith("S")
words.filter(lambda word: startsWithS(word)).collect()
Out[11]: ['Spark', 'Simple']
words2=words.map(lambda word: (word, word[0], word.startswith("S")))
words2.filter(lambda record: record[2]).take(5)
Out[13]: [('Spark', 'S', True), ('Simple', 'S', True)]
words.flatMap(lambda word: list(word)).take(5)
Out[14]: ['S', 'p', 'a', 'r', 'k']
words.sortBy(lambda word: len(word)*-1).take(5)
Out[15]: ['Definitive', 'Processing', 'Simple', 'Spark', 'Guide']
fiftyFiftySplit = words.randomSplit([0.5,0.5])
len(fiftyFiftySplit)
Out[17]: 2
fiftyFiftySplit[0].collect()
Out[18]: ['Definitive', ':', 'Simple']
fiftyFiftySplit[1].collect()
Out[19]: ['Spark', 'The', 'Guide', 'Big', 'Data', 'Processing', 'Made']
spark.sparkContext.parallelize(range(1,21)).reduce(lambda x,y: x+y)
def wordLengthReducer(leftWord, rightWord):
if len(leftWord)>len(rightWord):
return leftWord
return rightWord
words.reduce(wordLengthReducer)
Out[22]: 'Processing'
words.count()
Out[23]: 10
confidence=0.95
timeout=400
words.countApprox(timeout, confidence)
Out[24]: 10
words.countApproxDistinct(0.05)
words.countApproxDistinct(0.000017)
IllegalArgumentException Traceback (most recent call last) <command-1697308693987943> in <module>
----> 1 words.countApproxDistinct(0.000017)
/databricks/spark/python/pyspark/rdd.py in countApproxDistinct(self, relativeSD)
2734 # the hash space in Java is 2^32
2735 hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF)
-> 2736 return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD)
2737
2738 def toLocalIterator(self, prefetchPartitions=False):
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
121 # Hide where the exception came from that shows a non-Pythonic
122 # JVM exception message.
--> 123 raise converted from None
124 else:
125 raise
IllegalArgumentException: requirement failed: accuracy (1.7E-5) must be greater than 0.000017
words.countByValue()
words.first()
spark.sparkContext.parallelize(range(1,20)).max()
spark.sparkContext.parallelize(range(1,20)).min()
words.take(5)
#정렬
words.takeOrdered(5)
#최상윗값
words.top(5)
words.saveAsTextFile('/FileStore/tables/bookeTitle')
words.saveAsSequenceFile('/')
words.saveAsHadoopFile('/')
words.cache()
#저장 위치 지정
spark.sparkContext.setCheckpointDir('/FileStore/check')
words.checkpoint()
#wc -l: 파일 내 전체 라인 수 출력
words.pipe('wc -l').collect()
words.mapPartitions(lambda part:[1]).sum()
def indexedFunc(partitionIndex, withPartIterator):
return [f"partition: {partitionIndex}=>{x for x in withPartIterator}"]
words.mapPartitionsWithIndex(indexedFunc).collect()
spark.sparkContext.parallelize(['hello','world'],2).glom().collect()
spark.sparkContext.parallelize(['hello','world'],1).glom().collect()