Java 8
버전을 다운로드 해준다.위에서 다운받은 jar 파일 경로를 참조하여 SparkSession을 만들어준다.
spark = SparkSession \
.builder \
.config("spark.driver.extraClassPath", "[postgresql jar 파일 경로]") \
.appName('pyspark-db-connect') \
.getOrCreate()
ip = "127.0.0.1"
port = "5432"
user = [User Name]
passwd = [User Password]
db = [DB Name]
table_name = [Table 이름]
df = spark.read.format("jdbc") \
.option("url", f"jdbc:postgresql://{ip}:{port}/{db}") \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", table_name) \
.option("user", user) \
.option("password", passwd) \
.load()
df.show(5)
+--------------------+------+---------------------------------+---------------------------------+-----------+---+---------+
| uuid| name| job| residenct|blood_group|sex|birthdate|
+--------------------+------+---------------------------------+---------------------------------+-----------+---+---------+
|hiHBkVffEpBQYjnzH...|조시우| 웹 및 멀티미디어 디자이너| 인천광역시 서초구 영동대87로| AB-| F| 19190212|
|agpsZF4GY7BNvG93e...|이민준|기타 비금속제품관련 생산기 조작원| 대구광역시 용산구 역삼387길| AB+| F| 19811230|
|7ADnduVUeGHmsbPP2...|최영호| 식품공학 기술자 및 연구원| 울산광역시 금천구 삼성가| AB-| M| 20040127|
|E45xDDdVZejWnmUzS...|김상호| 화학제품 생산기 조작원|부산광역시 강북구 서초대가 (지...| AB-| M| 19420603|
|VfAHfJZGK4nkLFhfh...|서숙자| 기타 음식서비스 종사원| 전라북도 수원시 권선구 반포대길| O+| F| 19271101|
+--------------------+------+---------------------------------+---------------------------------+-----------+---+---------+
sql = "select * from temp"
df = spark.read.format("jdbc") \
.option("url", f"jdbc:postgresql://{ip}:{port}/{db}") \
.option("driver", "org.postgresql.Driver") \
.option("query", sql) \
.option("user", user) \
.option("password", passwd) \
.load()
df.show(5)
+--------------------+------+---------------------------------+---------------------------------+-----------+---+---------+
| uuid| name| job| residenct|blood_group|sex|birthdate|
+--------------------+------+---------------------------------+---------------------------------+-----------+---+---------+
|hiHBkVffEpBQYjnzH...|조시우| 웹 및 멀티미디어 디자이너| 인천광역시 서초구 영동대87로| AB-| F| 19190212|
|agpsZF4GY7BNvG93e...|이민준|기타 비금속제품관련 생산기 조작원| 대구광역시 용산구 역삼387길| AB+| F| 19811230|
|7ADnduVUeGHmsbPP2...|최영호| 식품공학 기술자 및 연구원| 울산광역시 금천구 삼성가| AB-| M| 20040127|
|E45xDDdVZejWnmUzS...|김상호| 화학제품 생산기 조작원|부산광역시 강북구 서초대가 (지...| AB-| M| 19420603|
|VfAHfJZGK4nkLFhfh...|서숙자| 기타 음식서비스 종사원| 전라북도 수원시 권선구 반포대길| O+| F| 19271101|
+--------------------+------+---------------------------------+---------------------------------+-----------+---+---------+
residenct
컬럼의 앞단어만 추출해서 시별로 group by 된 테이블을 저장해보도록 할 것이다.# residenct 컬럼의 앞 단어만 추출
df_ = df.withColumn('residenct', F.split(F.col('residenct'), " ").getItem(0))
# Group by count
df_group = df_.groupBy(F.col('residenct')).count().orderBy('count', ascending=False)
# Write to DB
df_group.write.format("jdbc") \
.option("url", f"jdbc:postgresql://{ip}:{port}/{db}")\
.option("driver", "org.postgresql.Driver") \
.option("dbtable", [저장할 Table 이름]) \
.option("user", user) \
.option("password", passwd) \
.save()
postgres=# SELECT * FROM group;
residenct | count
----------------+-------
울산광역시 | 12
충청남도 | 12
경기도 | 12
대전광역시 | 17
대구광역시 | 13
부산광역시 | 8
세종특별자치시 | 8
광주광역시 | 14
인천광역시 | 14
강원도 | 14
충청북도 | 14
전라북도 | 10
제주특별자치도 | 10
경상남도 | 11
경상북도 | 11
전라남도 | 9
서울특별시 | 6
(17개 행)
Platform Independent
유형을 선택하여 압축파일을 다운받아준다.mysql-connector-j-8.3.0.jar
파일을 찾을 수 있다.위에서 다운받은 jar 파일 경로를 참조하여 SparkSession을 만들어준다.
spark = SparkSession \
.builder \
.config("spark.driver.extraClassPath", "[mysql jar 파일 경로]") \
.appName('pyspark-db-connect') \
.getOrCreate()
ip = "127.0.0.1"
port = "3306"
user = [User Name]
passwd = [User Password]
db = [DB Name]
table_name = [Table 이름]
url = f"jdbc:mysql://{ip}:{port}/{db}"
properties = {
"user": user,
"password": passwd,
"driver": "com.mysql.jdbc.Driver"
}
table_name = table_name
df = spark.read.jdbc(url, table_name, properties=properties)
df = spark.read.format("jdbc") \
.option("url", f"jdbc:mysql://{ip}:{port}/{db}") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", table_name) \
.option("user", user) \
.option("password", passwd) \
.load()
sql = "select * from dept"
df = spark.read.format("jdbc") \
.option("url", f"jdbc:mysql://{ip}:{port}/{db}") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("query", sql) \
.option("user", user) \
.option("password", passwd) \
.load()
deptno
컬럼이 20보다 큰 row만 filter하여 저장한다.sdf = df.filter(F.col('deptno') > 20)
sdf.write \
.format("jdbc") \
.option("driver","com.mysql.cj.jdbc.Driver") \
.option("url", f"jdbc:mysql://{ip}:{port}/{db}") \
.option("dbtable", [저장할 Table 이름]) \
.option("user", user) \
.option("password", passwd) \
.save()