# List of tables
TABLES = ['db1.table1', 'db1.table2',
'db2.table3', 'db2.table4',
'db3.table5', 'db3.table6']
# List to keep the dictionary of table_name and respective count
table_count = []
# function to get the table records count.
def get_count(table: str) -> dict:
count_dict = {}
count_dict['table_name'] = table
try:
count = spark.read.table(table).count()
count_dict['count'] = count
except Exception:
count_dict['count'] = 0
return count_dict
def main():
for table in TABLES:
table_count.append(get_count(table))
if __name__ == "__main__":
main()
# Creating dataframe from list
count_df = spark.createDataFrame(table_count)\
.withColumn("date", datetime.now().date())
# writing into the table
count_df.coalesce(1).write.insertInto("control_db.counts_table")
from concurrent.futures import ThreadPoolExecutor
# List of tables
TABLES = ['db1.table1', 'db1.table2',
'db2.table3', 'db2.table4',
'db3.table5', 'db3.table6']
# function to get the table records count.
def get_count(table: str) -> dict:
...
# same as before
return count_dict
# Code implementation using ThreadPoolExecutor
def main():
counts = []
with ThreadPoolExecutor(max_workers=6) as executor:
counts = executor.map(get_count, TABLES)
return counts
if __name__ == "__main__":
table_count = main()
# Creating dataframe from list
count_df = spark.createDataFrame(table_count)\
.withColumn("date", datetime.now().date())
# writing into the table
count_df.coalesce(1).write.insertInto("control_db.counts_table")
# With exception hadling to make debugging easier
from concurrent.futures import ThreadPoolExecutor, as_compeleted
def main(TABLES: list) -> None:
"""Main method to submit count jobs in parallel.
Args:
TABLES (list): list of table name.
Raises:
e: Exception in case of any failures
"""
with ThreadPoolExecutor(max_workers=6) as executor:
to_do_map = {}
for table in TABLES:
# Submitting jobs in parallel
future = executor.submit(get_count, table)
print(f"scheduled for {table}: {future}")
to_do_map[future] = table
done_iter = as_completed(to_do_map)
for future in done_iter:
try:
count = future.result()
print("result: ", count)
count_list.append(count)
except Exception as e:
raise e
executor.submit(func, arg)
func(arg) 작업을 백그라운드에서 실행하도록 예약하고 Future 객체를 반환Future
아직 실행이 끝나지 않은, 나중에 결과가 생길 객체, result()로 결과를 얻을 수 있음as_completed()
작업이 완료된 순서대로 Future 객체들을 iterable로 반환future.result()
작업 결과를 꺼냄, 이 시점에서 Exception이 터지면 캐치 가능즉, Spark가 Python 작업을 실행할 때는 멀티스레드가 아닌 멀티프로세스 구조이기 때문에 GIL의 영향을 받지 않는다.
Spark에서 action (예: .collect(), .count())이 실행되면 다음 과정이 일어남