pySpark13 - DataFrame Graph

박성현·2024년 6월 16일

pySpark

목록 보기
13/17

기본 사용함수

pyspark.sql.functions.collect_set(col):

집계 함수(Aggregate function): 중복된 요소가 제거된 객체 집합을 반환합니다.

pyspark.sql.functions.concat_ws(sep, *cols):

DataFrame의 여러 문자열 열을 특정 구분자를 사용하여 하나의 문자열 열로 합치는 기능을 설명합니다.

실습

관련있는 hero1 과 hero2 graph로 형태로 저장되어 있음

from pyspark.sql import functions as f, SparkSession, types as t


spark = SparkSession.builder.appName("df_study").getOrCreate()
csv_file_path = "file:///home/jovyan/work/sample/hero-network.csv"
df = spark.read.option("header","True").option("inferSchema","True").csv(csv_file_path)
df.show(10)

+--------------------+--------------------+
|               hero1|               hero2|
+--------------------+--------------------+
|       LITTLE, ABNER|      PRINCESS ZANDA|
|       LITTLE, ABNER|BLACK PANTHER/T'CHAL|
|BLACK PANTHER/T'CHAL|      PRINCESS ZANDA|
|       LITTLE, ABNER|      PRINCESS ZANDA|
|       LITTLE, ABNER|BLACK PANTHER/T'CHAL|
|BLACK PANTHER/T'CHAL|      PRINCESS ZANDA|
|STEELE, SIMON/WOLFGA|    FORTUNE, DOMINIC|
|STEELE, SIMON/WOLFGA| ERWIN, CLYTEMNESTRA|
|STEELE, SIMON/WOLFGA|IRON MAN/TONY STARK |
|STEELE, SIMON/WOLFGA|IRON MAN IV/JAMES R.|
+--------------------+--------------------+
only showing top 10 rows

Step1 : hero1기준으로 관련 있는 hero2를 하나로 묶음

pyspark.sql.functions.collect_set(col):
집계 함수(Aggregate function): 중복된 요소가 제거된 객체 집합을 반환합니다.

data = df.groupBy("hero1").agg(f.collect_set("hero2").alias("connection"))\
            .withColumnRenamed("hero1", "hero")
data.show(10)


+--------------------+--------------------+
|                hero|          connection|
+--------------------+--------------------+
|             ABCISSA|[ELSIE DEE, FURY,...|
|             ABSALOM|[SHATTERSTAR II/G...|
|ABSORBING MAN | MUTA|[DRAX | MUTANT X-...|
|ABSORBING MAN/CARL C|[SOMMERS, APRIL, ...|
|ADAMS, CONGRESSMAN H|[SPIDER-MAN/PETER...|
| ADAMS, NICOLE NIKKI|[JUSTICE II/VANCE...|
|    ADAMSON, REBECCA|[KABALLA, GOLEM I...|
|               ADRIA|[DORMAMMU, ANCIEN...|
|   ADVENT/KYLE GROBE|[JUSTICE II/VANCE...|
|AGAMEMNON II/ANDREI |[BLACK WIDOW/NATA...|
+--------------------+--------------------+
only showing top 10 rows

Step2 : [] -> , 으로 분리

pyspark.sql.functions.concat_ws(sep, *cols):
DataFrame의 여러 문자열 열을 특정 구분자를 사용하여 하나의 문자열 열로 합치는 기능을 설명합니다.

data = data.withColumn('connection',f.concat_ws(',',f.col('connection')))

+--------------------+--------------------+
|                hero|          connection|
+--------------------+--------------------+
|             ABCISSA|ELSIE DEE,FURY, C...|
|             ABSALOM|SHATTERSTAR II/GA...|
|ABSORBING MAN | MUTA|DRAX | MUTANT X-V...|
|ABSORBING MAN/CARL C|SOMMERS, APRIL,HE...|
|ADAMS, CONGRESSMAN H|SPIDER-MAN/PETER ...|
| ADAMS, NICOLE NIKKI|JUSTICE II/VANCE ...|
|    ADAMSON, REBECCA|KABALLA,GOLEM III...|
|               ADRIA|DORMAMMU,ANCIENT ...|
|   ADVENT/KYLE GROBE|JUSTICE II/VANCE ...|
|AGAMEMNON II/ANDREI |BLACK WIDOW/NATASHA |
+--------------------+--------------------+

Step2 : split ',' 으로 분리하여 size 계산

data.withColumn('connection_size',f.size(f.split(f.col('connection'),',')))\
.orderBy(f.desc('connection_size')).show()


+--------------------+--------------------+---------------+
|                hero|          connection|connection_size|
+--------------------+--------------------+---------------+
|     CAPTAIN AMERICA|URICH, DORIS,ARMA...|           1795|
|SPIDER-MAN/PETER PAR|RED SHIFT,GAMELIN...|           1737|
|IRON MAN/TONY STARK |RED SHIFT,SABRETO...|           1443|
|    WOLVERINE/LOGAN |SABRETOOTH/VICTOR...|           1278|
|THING/BENJAMIN J. GR|CHORD, ANDREW,CAT...|           1262|
|SCARLET WITCH/WANDA |SABRETOOTH/VICTOR...|           1246|
|HUMAN TORCH/JOHNNY S|CAT KING,BUZZ,MAK...|           1202|
|MR. FANTASTIC/REED R|ARMADILLO/ANTONIO...|           1200|
|THOR/DR. DONALD BLAK|PARKER, MAY | TIM...|           1183|
|INVISIBLE WOMAN/SUE |CAPTAIN MARVEL II...|           1143|
+--------------------+--------------------+---------------+
only showing top 10 rows
profile
다소Good한 데이터 엔지니어

0개의 댓글