2004년, MapReduce on Simplified Data Processing on Large Clusters
2004~2005년, NDFS Project
2006~2007년, Apach Hado
2007~2008년, 폭발적 성장
2009~2013년, Apache Spark
2014~2020년, Databricks 와 Apach Spark
스쿱(Squoop): RDBMS(오라클,Mysql)~하둡 (데이터 이동)
플럼(Flume): 분산 환경에서 데이터 수집하여 병합한 후에 전송
HDFS: 분산 처리 파일 시스템
MapReduce: 자바 기반의 프로그래밍 모델
Yarn: 하둡 클러스터 자원 관리
Spark: In-memory기반의 클러스터링 컴퓨터 데이터 처리
Pig: 어려운 데이터 처리 과정으로 filter, join,query등 실행
Impala: 고성능의 SQL엔진
Hive: sql기능 제공
Cloudera Search: real-time을 통해 데이터 검색
Hue: 웹 인터페이스 제공
우지(Oozie): 워크플로우 관리 및 Job 스케줄러
HBase: NoSQL기반으로 HDFS로 인해 처리된 데이터 저장
Zeppelin : 데이터 시각화
SparkMLlib: 머신러닝 관련 라이브러리
-프로그래밍언어: Java,Python, SQL, Scala,R
자원 관리: 하둡의 Yarn, Mesos사용 혹은 자체 기능 사용
데이터 저장: Local FS, HDFS이용, AWS의 S3인스턴스 이용등을 이용하여 유연한 확장성 강조
주 언어: Scala(Spark)
환경: 클러스터
Spark SQL: SQL 관련 작업
Streaming: Streaming 데이터 처리
MLlib: ML라이브러리
GraphX: Graph Processing
특징
from pyspark import SparkConf, SparkContext
sc = SparkContext()
rdd = sc.parallelize([1,2,3])
rdd
import os
file_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/test.txt'
with open(file_path, 'w') as f:
for i in range(10):
f.write(str(i)+'\n')
print('OK')
rdd2 = sc.textFile(file_path)
print(rdd2)
print(type(rdd2))
rdd.take(3)
x = sc.parallelize(["b", "a", "c", "d"])
y = x.map(lambda z: (z, 1))
print(x.collect()) #collect()는 actions입니다.
print(y.collect())
text = sc.parallelize(['a', 'b', 'c', 'd'])
capital = text.map(lambda x: x.upper())
A = capital.filter(lambda x: 'A' in x)
print(text.collect()) #['a','b','c','d']
print(A.collect()) #['A']
wordsDataset = sc.parallelize(["Spark is funny", "It is beautiful", "And also It is implemented by python"])
result = wordsDataset.flatMap(lambda x: x.split()).filter(lambda x: x != " ").map(lambda x: x.lower())
# 공백은 제거합니다.
# 단어를 공백기준으로 split 합니다.
result.collect()
nums = sc.parallelize(list(range(10)))
nums.collect()
nums.take(3)
nums.count()
nums.reduce(lambda x, y: x + y)
file_path = os.getenv('HOME')+'/aiffel/bigdata_ecosystem/file.txt'
nums.saveAsTextFile(file_path)
!ls -l ~/aiffel/bigdata_ecosystem
# RDD 생성
rdd = sc.parallelize(range(1,100))
# RDD Transformation
rdd2 = rdd.map(lambda x: 0.5*x - 10).filter(lambda x: x > 0)
# RDD Action
rdd2.reduce(lambda x, y: x + y)
driver 프로그램 구동 시, 생성되는 특수 객체
1 by 1. 사용 후 종료해야함
문법: pyspark.SparkContext()
스파크 기능의 기본 엔트리 포인트
RDD 만들어서 브로드캐스트 사용
새로운 것 만들기 전에 활성 중지해야함
from pyspark import SparkConf, SparkContext
sc = SparkContext()
sc
type(sc)
sc.stop()
sc = SparkContext(master='local', appName='PySpark Basic')
sc
sc.getConf().getAll()
sc.stop()
conf = SparkConf().setAppName('PySpark Basic').setMaster('local')# 어플리케이션의 이름과 Master의 URL설정
sc = SparkContext(conf=conf)
sc
sc.stop()