TIL no.19-병렬성(Parallelism)

백선호·2021년 7월 2일
1

TIL

목록 보기
17/39
post-thumbnail

동시성(Concurrency)과 병렬성(Parallelism)이란?


동시성이란 흔히 말하는 멀티태스킹입니다. 우리가 어떠한 일을 할 때 '나는 멀티가 안돼'라는 말을 종종 하는 사람을 볼 수 있을 것이다. 맞는 얘기다 인간은 멀티, 즉 멀티태스킹이 될 수가 없다. 그러나 컴퓨터는 멀티태스킹을 지원한다. 예를 들어 노래를 들으면서 인터넷 쇼핑을 할 수 있는 것처럼 말이다. 동시성이 한순간에 하나의 일만을 처리하지만 별령성은 진짜로 동시에 여러 개의 일을 처리하는 것을 의미한다. 마치 '일꾼'이 여럿이어서 여러 개의 일을 처리하는 것을 별령성이라고 한다. 지금까지의 설명을 들어보면 별령성의 단점을 찾아볼 수 없어 보이지만 별령성은 물리적인 코어의 개수에 제한이 있다. 보통의 사용자들은 쿼드코어를 사용하고 있으면 cpu는 많아야 8개 정도이다. 즉 한순간에 처리할 수 있는 양이 제한적이게 된다.

병렬성(비동기 작업을 위한 futures)

futures를 써서 동시성 프로그래밍을 하는 방법은 2가지가 있는데 첫 번째는 concurrent.futures의 map 함수를 쓰는 것이고, 두 번째는 wait과 as_completed를 쓰는 것이다.

1) concurrent.futures의 map 함수

import os
import time
from concurrent import futures

WORK_LIST = [100000, 1000000, 10000000, 10000000]

# 동시성 합계 계산 메인함수
# 누적 합계 함수(제네레이터)
def sum_generator(n):
    return sum(n for n in range(1, n+1))

def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))
    # 시작 시간
    start_tm = time.time()
    # 결과 건수
    # ProcessPoolExecutor
    with futures.ThreadPoolExecutor() as excutor:
        # map -> 작업 순서 유지, 즉시 실행
        result = excutor.map(sum_generator, WORK_LIST)
    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포멧
    msg = '\n Result -> {} Time : {:.2f}s'
    # 최종 결과 출력
    print(msg.format(list(result), end_tm))

# 실행 (시작점이 있어야 futures 정상 사용이 가능하기 때문에 따로 함수를 만듬)
if __name__ == '__main__':
    main()
  
   Result -> [5000050000, 500000500000, 50000005000000, 50000005000000] Time : 1.33s
  
  

상위 코드에서 ThreadPoolExecutor라고 된 부분을 ProcessPoolExecutor로 바꿔주면 프로세스를 이용해서 작업할 수도 있다. 쉽게 생각해서, 스레드를 이용해서 동시성 프로그래밍을 하려면 ThreadPoolExecutor를, 프로세스를 이용하려면 ProcessPoolExecutor를 사용하면 된다.

2) wait과 as_completed


import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORK_LIST = [10000, 100000, 1000000, 10000000]


# 동시성 합계 계산 메인 함수
# 누적 합계 함수(제레네이터)
def sum_generator(n):
    return sum(n for n in range(1, n+1))
# wait
# as_completed
def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))  #10보다 것으로 스레드 만들기

    
    # 시작 시간
    start_tm = time.time()
    # Futures
    futures_list = []

    # 결과 건수
    # ProcessPoolExecutor
    with ThreadPoolExecutor() as excutor:
        for work in WORK_LIST:
            # future 반환 (실제로 일을 하지는 않음)
            future = excutor.submit(sum_generator, work)
            # 스케쥴링
            futures_list.append(future)
          
        
        # wait 결과 출력 7초까지만 기다리고 그 때까지 못 하는 일은 실패로 간주하고 중단
        result = wait(futures_list, timeout=7)

    # wait 결과 출력
         result = wait(futures_list, timeout=7)
         # 성공
         print('Completed Tasks : ' + str(result.done))
         # 실패
         print('Pending ones after waiting for 7seconds : ' + str(result.not_done))
         # 결과 값 출력
         print([future.result() for future in result.done])
        
        
        
            
    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포멧
    msg = '\n Time : {:.2f}s'
    # 최종 결과 출력
    print(msg.format(end_tm))



# 실행
if __name__ == '__main__':
    main()

Pending ones after waiting for 7seconds : set()
[500000500000, 50005000, 50000005000000, 5000050000]

 Time : 0.53s

첫 번째 방법을 사용했던 것보다 코드가 길어졌다. futures_list라는 빈 리스트를 만들어서 미래에 할 일들을 담아 놓는다. with 문을 지나 for 문으로 executor.submit(작업 함수, 각개 일) 메서드를 실행해 반환된 작업을 future에 할랑한 후 다시 futures_list에 넣는다. result에 timeout 7초를 정해서 7초가 넘으면 실패로 간주하고 wait의 결과를 출력한다.


import os
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, wait, as_completed

WORK_LIST = [10000, 100000, 1000000, 10000000]


# 동시성 합계 계산 메인 함수
# 누적 합계 함수(제레네이터)
def sum_generator(n):
    return sum(n for n in range(1, n+1))
# wait
# as_completed
def main():
    # Worker Count
    worker = min(10, len(WORK_LIST))  #10보다 것으로 스레드 만들기

    
    # 시작 시간
    start_tm = time.time()
    # Futures
    futures_list = []

    # 결과 건수
    # ProcessPoolExecutor
    with ThreadPoolExecutor() as excutor:
        for work in WORK_LIST:
            # future 반환 (실제로 일을 하지는 않음)
            future = excutor.submit(sum_generator, work)
            # 스케쥴링
            futures_list.append(future)
        
        # as_completed 결과 출력
        for future in as_completed(futures_list):
            result = future.result()
            done = future.done()
            cancelled = future.cancelled
            
            # future 결과 확인
            print('Future Result : {}, Done : {}'.format(result, done))
            print('Future Cancelled : {}'.format(cancelled))
        
        
            
    # 종료 시간
    end_tm = time.time() - start_tm
    # 출력 포멧
    msg = '\n Time : {:.2f}s'
    # 최종 결과 출력
    print(msg.format(end_tm))



# 실행
if __name__ == '__main__':
    main()
    
    
    Future Result : 5000050000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x7feaed4cfe20 state=finished returned int>>
Future Result : 50005000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x7feaed4cfca0 state=finished returned int>>
Future Result : 500000500000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x7feaef058670 state=finished returned int>>
Future Result : 50000005000000, Done : True
Future Cancelled : <bound method Future.cancelled of <Future at 0x7feaef058970 state=finished returned int>>

 Time : 0.56s

as_completed 결과도 출력도 가능하다. as_completed 호출 결과로 for 문으로 돌려 추출된 결괏값을 result에 할당하고 future.done()으로 작업이 완료됐는지(True/False), future.cancelled()는 작업이 취소됐는지(True/False)를 확인할 수 있다.

결과적으로 한 번에 한 개의 작업이 가능한 프로그램을 사용하는 것보다 더 효율적으로 사용이 가능하다.

profile
baik9261@gmail.com

0개의 댓글