createOrReplaceTempView : spark Session이 살아있는 동안 존재createOrReplaceGlobalTempView : spark 드라이버가 살아있는 동안 존재namegender_df.createOrReplaceTempView("namegender")
namegender_group_df = spark.sql("""
SELECT gender, count(1) FROM namegender GROUP BY 1
""")
print(namegender_group_df.collect())
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", "jdbc:redshift://HOST:PORT/DB?user=ID&password=PASSWORD") \
.option("dbtable", "raw_data.user_session_channel") \
.load()
!pip install pyspark==3.3.1 py4j==0.10.9.5
# 버전이 맞지 않아 특정 버전으로 다시 참조해야함.
!wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/2.1.0.28/redshift-jdbc42-2.1.0.28.jar -P /usr/local/lib/python3.10/dist-packages/pyspark/jars/
# 찾은 JDBC 바탕으로 세션 생성
from pyspark.sql import SparkSession
# SparkSession 생성
spark = SparkSession.builder \
.appName("Python Spark SQL #1") \
.config("spark.jars", "/usr/local/lib/python3.10/dist-packages/pyspark/jars/redshift-jdbc42-2.1.0.28.jar") \
.getOrCreate()
사용자 ID:
세션 ID:


# Redshift와 연결해서 DataFrame으로 로딩하기
url = "jdbc:redshift://le666666edshift.amazonaws.com:5439/dev?user=66666&password=666666"
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.user_session_channel") \
.load()
df_session_timestamp = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_timestamp") \
.load()
df_session_transaction = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_transaction") \
.load()
df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")
df_session_transaction.createOrReplaceTempView("session_transaction")
createOrReplaceTempView로 데이터프레임을 View로 생성top_rev_user_df = spark.sql("""
SELECT userid,
SUM(str.amount) revenue,
SUM(CASE WHEN str.refunded = False THEN str.amount END) net_revenue
FROM user_session_channel usc
JOIN session_transaction str ON usc.sessionid = str.sessionid
GROUP BY 1
ORDER BY 2 DESC
LIMIT 10""")

top_rev_user_df2 = spark.sql("""
SELECT
userid,
SUM(amount) total_amount,
RANK() OVER (ORDER BY SUM(amount) DESC) rank
FROM session_transaction st
JOIN user_session_channel usc ON st.sessionid = usc.sessionid
GROUP BY userid
ORDER BY rank
LIMIT 10""")

!wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/2.1.0.28/redshift-jdbc42-2.1.0.28.jar -P /usr/local/lib/python3.10/dist-packages/pyspark/jars/
from pyspark.sql import SparkSession
# SparkSession 생성
spark = SparkSession.builder \
.appName("Python Spark SQL #3") \
.config("spark.jars", "/usr/local/lib/python3.10/dist-packages/pyspark/jars/redshift-jdbc42-2.1.0.28.jar") \
.getOrCreate()
# Redshift와 연결해서 DataFrame으로 로딩하기
url = "jdbc:redshift://l-svt.ap-northeast-2.re-9/dev?user=gues-34"
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.user_session_channel") \
.load()
df_session_timestamp = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_timestamp") \
.load()
df_session_transaction = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_transaction") \
.load()
df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")
df_session_transaction.createOrReplaceTempView("session_transaction")
mon_channel_rev_df = spark.sql("""
SELECT LEFT(sti.ts, 7) year_month,
usc.channel channel,
COUNT(DISTINCT userid) total_visitors
FROM user_session_channel usc
LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid
GROUP BY 1 ,2
ORDER BY 1, 2""")

mon_channel_rev_df = spark.sql("""
SELECT LEFT(sti.ts, 7) year_month,
usc.channel channel,
COUNT(DISTINCT userid) total_visitors,
COUNT(DISTINCT CASE WHEN amount is not NULL THEN userid END) paid_visitors
FROM user_session_channel usc
LEFT JOIN session_timestamp sti ON usc.sessionid = sti.sessionid
LEFT JOIN session_transaction str ON usc.sessionid = str.sessionid
GROUP BY 1 ,2
ORDER BY 1, 2""")

mon_channel_rev_df = spark.sql("""
SELECT LEFT(ts, 7) month,
usc.channel,
COUNT(DISTINCT userid) uniqueUsers,
COUNT(DISTINCT (CASE WHEN amount >= 0 THEN userid END)) paidUsers,
SUM(amount) grossRevenue,
SUM(CASE WHEN refunded is not True THEN amount END) netRevenue,
ROUND(COUNT(DISTINCT CASE WHEN amount >= 0 THEN userid END)*100
/ COUNT(DISTINCT userid), 2) conversionRate
FROM user_session_channel usc
LEFT JOIN session_timestamp t ON t.sessionid = usc.sessionid
LEFT JOIN session_transaction st ON st.sessionid = usc.sessionid
GROUP BY 1, 2
ORDER BY 1, 2;
""")




#세션 설정
from pyspark.sql import SparkSession
# SparkSession 생성
spark = SparkSession.builder \
.appName("Python Spark SQL #2") \
.config("spark.jars", "/usr/local/lib/python3.10/dist-packages/pyspark/jars/redshift-jdbc42-2.1.0.28.jar") \
.getOrCreate()
# Redshift와 연결해서 DataFrame으로 로딩하기
url = "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234"
df_user_session_channel = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.user_session_channel") \
.load()
df_session_timestamp = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_timestamp") \
.load()
df_session_transaction = spark.read \
.format("jdbc") \
.option("driver", "com.amazon.redshift.jdbc42.Driver") \
.option("url", url) \
.option("dbtable", "raw_data.session_transaction") \
.load()
df_user_session_channel.createOrReplaceTempView("user_session_channel")
df_session_timestamp.createOrReplaceTempView("session_timestamp")
df_session_transaction.createOrReplaceTempView("session_transaction")
first_last_channel_df = spark.sql("""
WITH RECORD AS (
SELECT /*사용자의 유입에 따른, 채널 순서 매기는 쿼리*/
userid,
channel,
ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts ASC) AS seq_first,
ROW_NUMBER() OVER (PARTITION BY userid ORDER BY ts DESC) AS seq_last
FROM user_session_channel u
LEFT JOIN session_timestamp t
ON u.sessionid = t.sessionid
)
SELECT /*유저의 첫번째 유입채널, 마지막 유입 채널 구하기*/
f.userid,
f.channel first_channel,
l.channel last_channel
FROM RECORD f
INNER JOIN RECORD l ON f.userid = l.userid
WHERE f.seq_first = 1 and l.seq_last = 1
ORDER BY userid
""")
first_last_channel_df2 = spark.sql("""
SELECT DISTINCT A.userid,
FIRST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS First_Channel,
LAST_VALUE(A.channel) over(partition by A.userid order by B.ts
rows between unbounded preceding and unbounded following) AS Last_Channel
FROM user_session_channel A
LEFT JOIN session_timestamp B
ON A.sessionid = B.sessionid""")
이런식으로도 같은 결과를 얻을 수 있음



select * from Vital v
join Alert a ON v.vitalID = a.vitalID;


select *
from raw_data.Vital v LEFT JOIN raw_data.Alert a ON v.vitalID = a.vitalID;


SELECT * FROM raw_data.Vital v
FULL JOIN raw_data.Alert a ON v.vitalID = a.vitalID;


SELECT * FROM raw_data.Vital v CROSS JOIN raw_data.Alert a;
왼쪽 테이블과 오른쪽 테이블의 모든 레코드들의 조합을 리턴

경우의 수를 생각하면 쉬운데 n개와 m개의 테이블이 있으면 cross join을 하면 n * m 개의 레코드가 나옴

SELECT * FROM raw_data.Vital v1
JOIN raw_data.Vital v2 ON v1.vitalID = v2.vitalID;


spark.sql.autoBroadcastjoinThreshold 파라미터로 충분히 작은지 여부 결정.withColumn함수와 같이 사용하는 것이 일반적이고, spark sql에서도 사용 가능import pyspark.sql.functions as F
from pyspark.sql.types import *
#입력값 z를 대문자로 변환하는 간단한 함수
upperUDF = F.udf(lambda z:z.upper())
#Curated Name이라는 새로운 열을 추가하고 이 열의 값은 Name을 통해 기존 Name을 대문자로 바꾸는 것
df.withColumn("Curated Name", upperUDF("Name"))

def upper(s):
return s.upper()
# 먼저 테스트
upperUDF = spark.udf.register("upper", upper)
spark.sql("SELECT upper('aBcD')").show()
# DataFrame 기반 SQL에 적용
df.createOrReplaceTempView("test")
spark.sql("""SELECT name, upper(name) "Curated Name" FROM test""").show()
data = [
{"a": 1, "b": 2},
{"a": 5, "b": 5}
]
df = spark.createDataFrame(data)
df.withColumn("c", F.udf(lambda x, y: x + y)("a", "b"))
.withColums로 추가def plus(x, y):
return x + y
#spark.udf.register를 사용하여 정의한 upper함수를 upper라는 이름으로 UDF에 등록
plusUDF = spark.udf.register("plus", plus)
spark.sql("SELECT plus(1, 2)").show()
df.createOrReplaceTempView("test")
spark.sql("SELECT a, b, plus(a, b) c FROM test").show()
!wget https://s3-geospatial.s3.us-west-2.amazonaws.com/orders.csv
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark UDF") \
.getOrCreate()

from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, LongType
order = spark.read.options(delimiter='\t').option("header","true").csv("orders.csv")
options(delimiter='\t'는 csv 파일 필드 구분자
struct = ArrayType(
StructType([
StructField("name", StringType()),
StructField("id", StringType()),
StructField("quantity", LongType())
])
)
order.withColumn("item", explode(from_json("items", struct))).show(truncate=False)
from_json("items", struct)은 item 열의 json 데이터를 정의한 스키마 struct를 사용하여 구조화된 데이터로 변환truncate=False은 긴 문자열도 잘리지 않고 모두 표시order_items = order.withColumn("item", explode(from_json("items", struct))).drop("items")
order_items.show(5)
order_items.printSchema()
스키마 구조와 상위 5개 레코드 확인하는 쿼리
order_items.createOrReplaceTempView("order_items")
spark.sql("""SELECT item.quantity FROM order_items WHERE order_id = '1816674631892'""").show()
생성한 데이터 프레임을 임시 뷰로 등록
쿼리문
spark.catalog.listTables()
for f in spark.catalog.listFunctions():
print(f[0])
spark.catalog.listTables() 현재 세션에 등록된 테이블의 목록을 반환spark.catalog.listFunctions() 현재 세션에 등록된 함수 목록 반환카탈로그 : 테이블과 뷰에 관한 메타 데이터 처리
테이블 관리 방식
테이블들은 데이터베이스라 부르는 폴더와 같은 구조로 관리

DB로 치면 스키마 - 테이블 느낌

Hive와 호환되는 메타스토어 사용:
SparkSession 생성 시 enableHiveSupport() 호출:
enableHiveSupport()를 호출함.from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark Hive") \
.enableHiveSupport() \
.getOrCreate()
테이블 생성 방법:
dataframe.saveAsTable("테이블이름")을 사용하여 데이터 프레임을 테이블로 저장할 수 있음.CREATE TABLE, CTAS)을 사용하여 테이블을 생성할 수 있음.데이터 저장 위치:
spark.sql.warehouse.dir가 가리키는 위치에 저장됨.테이블 타입의 선호:
HDFS에 존재하는 데이터 사용:
LOCATION이라는 프로퍼티를 사용함.메타데이터 관리:
테이블 생성 예시:
CREATE TABLE table_name (
column1 type1,
column2 type2,
column3 type3,
…
)
USING PARQUET
LOCATION 'hdfs_path';
enableHiveSupport()를 사용하여 Hive 메타스토어와 통합함.메타데이터 저장:
데이터 접근 관리:
통합 및 호환성:
SparkSession 생성 시 enableHiveSupport() 호출:
enableHiveSupport()를 호출하여 Hive 메타스토어를 사용하게 함.from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark Hive") \
.enableHiveSupport() \
.getOrCreate()
중앙 집중화 및 데이터 관리:
호환성:
Hive 메타스토어는 빅 데이터 환경에서 데이터 관리와 접근성을 높이기 위해 중요한 역할을 함.
SparkSession을 생성하여 Hive 메타스토어와 통합하는 방법은 Spark SQL을 사용하여 Hive 테이블과 메타데이터를 관리하고 쿼리하는 데 유용합니다. 이를 활용하여 다양한 데이터 작업을 수행할 수 있습니다. 다음은 SparkSession을 활용하는 몇 가지 방법입니다:
Hive 메타스토어를 사용하여 새로운 테이블을 생성하고 데이터를 삽입할 수 있습니다.
# 테이블 생성
spark.sql("CREATE TABLE IF NOT EXISTS test_table (id INT, name STRING)")
# 데이터 삽입
spark.sql("INSERT INTO test_table VALUES (1, 'Alice'), (2, 'Bob')")
기존에 Hive에 저장된 테이블을 조회할 수 있습니다.
# 테이블 조회
df = spark.sql("SELECT * FROM test_table")
df.show()
DataFrame을 생성하여 Hive 테이블로 저장할 수 있습니다.
# DataFrame 생성
data = [(3, 'Charlie'), (4, 'David')]
columns = ["id", "name"]
df = spark.createDataFrame(data, columns)
# DataFrame을 테이블로 저장
df.write.saveAsTable("test_table")
이미 HDFS에 존재하는 데이터를 External Table로 정의할 수 있습니다.
# External Table 생성
spark.sql("""
CREATE EXTERNAL TABLE IF NOT EXISTS external_table (
id INT,
name STRING
)
STORED AS PARQUET
LOCATION 'hdfs://path/to/data'
""")
Hive 메타스토어와 통합된 SparkSession을 사용하여 다양한 데이터 분석 작업을 수행할 수 있습니다.
# 데이터 분석 쿼리 실행
result = spark.sql("SELECT name, COUNT(*) as count FROM test_table GROUP BY name")
result.show()
Hive 메타스토어를 통해 데이터베이스와 테이블의 메타데이터를 관리할 수 있습니다.
# 데이터베이스 목록 조회
databases = spark.sql("SHOW DATABASES")
databases.show()
# 테이블 목록 조회
tables = spark.sql("SHOW TABLES IN default")
tables.show()